package org.budo.warehouse.logic.filter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.budo.support.java.regex.util.RegexUtil;
import org.budo.support.lang.util.StringUtil;
import org.budo.support.spring.expression.util.SpelUtil;
import org.budo.time.Time;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.DataMessagePojo;
import org.budo.warehouse.logic.api.IEventFilterLogic;
import org.budo.warehouse.service.api.IEntryBufferService;
import org.budo.warehouse.service.entity.EntryBuffer;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.stereotype.Service;

/**
 * @author lmw
 */
@Service
public class EventFilterLogicImpl implements IEventFilterLogic {
    @Resource
    private IEntryBufferService entryBufferService;

    @Override
    public DataMessage filter(Pipeline pipeline, DataMessage dataMessage) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();

        List<DataEntry> filteredEntries = new ArrayList<DataEntry>();
        for (DataEntry dataEntry : dataEntries) {
            boolean filterEntry = this.filterEntry(pipeline, dataEntry);
            if (filterEntry) {
                filteredEntries.add(dataEntry);
            }
        }

        Integer dataNodeId = dataMessage.getDataNodeId();
        return new DataMessagePojo(dataMessage.getId(), dataNodeId, filteredEntries);
    }

    /**
     * @see org.budo.warehouse.logic.task.BudoWarehouseHaHealthCheckTask#warehouseHealthCheck()
     */
    private boolean filterEntry(Pipeline pipeline, DataEntry dataEntry) {
        String schemaName = dataEntry.getSchemaName();
        String tableName = dataEntry.getTableName();
        String eventType = dataEntry.getEventType();

        // 定时的健康检查
        // https://code.aliyun.com/wanshao/canal/commit/96dca0b9e6b6dcc12fc439d2f17d85a4656485df
        if (StringUtil.equals("mysql", schemaName) //
                && StringUtil.equals("ha_health_check", tableName) //
                && StringUtil.equals("CREATE", eventType)) {
            this.insertEntryBuffer(pipeline, dataEntry); // 保存记录
            return false;
        }

        return this.bySchema(pipeline, dataEntry) //
                && this.byTable(pipeline, dataEntry) //
                && this.byEventFilter(pipeline, dataEntry);
    }

    private void insertEntryBuffer(Pipeline pipeline, DataEntry dataEntry) {
        EntryBuffer entryBuffer = new EntryBuffer() //
                .setPipelineId(pipeline.getId()) //
                .setEventType(dataEntry.getEventType()) //
                .setSchemaName(dataEntry.getSchemaName()) //
                .setTableName(dataEntry.getTableName()) //
                .setSql(dataEntry.getSql()) //
                .setFlushedAt(Time.now().toTimestamp()); // 直接存储为已执行,这里只是为了记录,不是为了延迟执行
        entryBufferService.insert(entryBuffer);
    }

    /**
     * 库名正则筛选
     */
    private boolean bySchema(Pipeline pipeline, DataEntry dataEntry) {
        String sourceSchema = pipeline.getSourceSchema();
        if (StringUtil.isEmpty(sourceSchema)) {
            return true; // 未配置就全部
        }

        String schemaName = dataEntry.getSchemaName();
        Boolean matches = RegexUtil.matches(sourceSchema, schemaName);
        return matches;
    }

    /**
     * 表名正则筛选
     */
    private boolean byTable(Pipeline pipeline, DataEntry dataEntry) {
        String sourceTable = pipeline.getSourceTable();
        if (StringUtil.isEmpty(sourceTable)) {
            return true; // 未配置就全部
        }

        String tableName = dataEntry.getTableName();
        Boolean matches = RegexUtil.matches(sourceTable, tableName);
        return matches;
    }

    /**
     * 类型等,SPEL筛选
     */
    private boolean byEventFilter(Pipeline pipeline, DataEntry dataEntry) {
        String eventFilter = pipeline.getEventFilter();
        if (StringUtil.isEmpty(eventFilter)) {
            return true; // 未配置就全部
        }

        if (!eventFilter.startsWith("#{")) {
            throw new IllegalArgumentException("#25 eventFilter=" + eventFilter + ", pipeline=" + pipeline);
        }

        Map<String, Object> map = new HashMap<String, Object>();
        map.put("eventType", dataEntry.getEventType());
        map.put("schemaName", dataEntry.getSchemaName());
        map.put("tableName", dataEntry.getTableName());

        String merge = SpelUtil.merge(eventFilter, map);
        return "true".equals(merge);
    }

    // TODO 根据内容筛选
    // #{ eventType == 'DELETE' && $row.schedule_date < _now } // schedule_date 小于
    // 当前
}